James McCreight
TLDR: re-rechunking takes data access time for a single streamflow gage from 2 mins to 80ms.
Sometimes the chunks are not optimized to match your access pattern and you need to re-chunk (re-rechunk?) to maintain your cool. As seen in usage_example_streamflow_timeseries.ipynb, getting a single gage can take about 2 minutes. If you have to do that over and over again for 8000 gages you could be waiting like 11 days to get it all done. Instead of doing that, rechunk!
This notebook take the chrtout.zarr store, uses the gage_id variable to subset out just the gages, and then rechunks the subset to optimize access to the full timeseries at each point individually. Note that this is the inverse chunk when the model natively writes the "chanobs" files, which consists of all the gages (single space chunk) in separate files by time (effectively a chunk for each time).
# Using virtual environment in requirements_vis.txt
from climata.usgs import DailyValueIO
import dask
from dask.distributed import Client, progress, LocalCluster, performance_report
from dask_jobqueue import PBSCluster
import holoviews as hv
import hvplot
import numcodecs
import numpy as np
import pathlib
import pandas as pd
from rechunker import rechunk
import shutil
import xarray as xr
hv.extension('bokeh')
hv.opts.defaults(
hv.opts.Scatter(width=1200, height=500) )
pd.options.plotting.backend = 'holoviews'
cfs_2_cms = 0.028316846592
n_workers = 16
n_cores = 1
queue = "casper"
cluster_mem_gb = 25
chunk_mem_factor = 0.9
numcodecs.blosc.use_threads = False
cluster = PBSCluster(
cores=n_cores,
memory=f"{cluster_mem_gb}GB",
queue=queue,
project="NRAL0017",
walltime="05:00:00",
death_timeout=75,)
/glade/work/jamesmcc/python_envs/379hv/lib/python3.7/site-packages/distributed/node.py:161: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 39963 instead
f"Port {expected} is already in use.\n"
cluster.adapt(maximum=n_workers, minimum=n_workers)
client = Client(cluster)
#dask.config.set({"distributed.dashboard.link": "/{port}/status"})
client.dashboard_link
'http://128.117.208.103:39963/status'
chrtout_file = '/glade/p/datashare/ishitas/nwm_retro_v2.1/chrtout.zarr'
ds_nwm_chrtout = xr.open_zarr(chrtout_file)
nwm_gages = (
ds_nwm_chrtout
.gage_id.where(ds_nwm_chrtout.gage_id != ''.rjust(15).encode(), drop=True))
nwm_gages
<xarray.DataArray 'gage_id' (feature_id: 7994)>
dask.array<where, shape=(7994,), dtype=object, chunksize=(7994,), chunktype=numpy.ndarray>
Coordinates:
elevation (feature_id) float32 dask.array<chunksize=(7994,), meta=np.ndarray>
* feature_id (feature_id) int32 3109 3923 12932 ... 1170023539 1180000535
gage_id (feature_id) |S15 dask.array<chunksize=(7994,), meta=np.ndarray>
latitude (feature_id) float32 dask.array<chunksize=(7994,), meta=np.ndarray>
longitude (feature_id) float32 dask.array<chunksize=(7994,), meta=np.ndarray>
order (feature_id) int32 dask.array<chunksize=(7994,), meta=np.ndarray>
Attributes:
coordinates: lat lon
long_name: NHD Gage Event ID from SOURCE_FEA field in Gages feature class
|
|
array([ 3109, 3923, 12932, ..., 1170022657, 1170023539,
1180000535], dtype=int32)
|
|
|
|
ds_nwm_gages_0 = (
ds_nwm_chrtout
.where(ds_nwm_chrtout.gage_id.isin(nwm_gages), drop=True))
/glade/work/jamesmcc/python_envs/379hv/lib/python3.7/site-packages/xarray/core/indexing.py:1369: PerformanceWarning: Slicing is producing a large chunk. To accept the large
chunk and silence this warning, set the option
>>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
... array[indexer]
To avoid creating the large chunks, set the option
>>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
... array[indexer]
return self.array[key]
ds_nwm_gages = ds_nwm_gages_0.copy()
# ds_nwm_gages['crs'] = ds_nwm_gages['crs'][0]
ds_nwm_gages = ds_nwm_gages.drop('crs') ## gives a dask type issue if retained... not sure why that dosent happen elsewhere
ds_nwm_gages
<xarray.Dataset>
Dimensions: (feature_id: 7994, time: 367439)
Coordinates:
elevation (feature_id) float32 dask.array<chunksize=(7994,), meta=np.ndarray>
* feature_id (feature_id) int32 3109 3923 12932 ... 1170023539 1180000535
gage_id (feature_id) |S15 dask.array<chunksize=(7994,), meta=np.ndarray>
latitude (feature_id) float32 dask.array<chunksize=(7994,), meta=np.ndarray>
longitude (feature_id) float32 dask.array<chunksize=(7994,), meta=np.ndarray>
order (feature_id) int32 dask.array<chunksize=(7994,), meta=np.ndarray>
* time (time) datetime64[ns] 1979-02-01T01:00:00 ... 2020-12-31T23:0...
Data variables:
streamflow (time, feature_id) float64 dask.array<chunksize=(672, 106), meta=np.ndarray>
velocity (time, feature_id) float64 dask.array<chunksize=(672, 106), meta=np.ndarray>
Attributes:
TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2
code_version: v5.2.0-beta2
featureType: timeSeries
model_configuration: retrospective
proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...
|
array([ 3109, 3923, 12932, ..., 1170022657, 1170023539,
1180000535], dtype=int32)
|
|
|
|
array(['1979-02-01T01:00:00.000000000', '1979-02-01T02:00:00.000000000',
'1979-02-01T03:00:00.000000000', ..., '2020-12-31T21:00:00.000000000',
'2020-12-31T22:00:00.000000000', '2020-12-31T23:00:00.000000000'],
dtype='datetime64[ns]')
|
|
dim_chunk_sizes = {'feature_id': 1, 'time': len(ds_nwm_gages.time)}
ds_nwm_gages = ds_nwm_gages.chunk(chunks=dim_chunk_sizes)
chunk_plan = {}
for vv in ds_nwm_gages.variables:
if vv in ['streamflow', 'velocity']:
chunk_plan[vv] = tuple((dim_chunk_sizes[tt] for tt in ds_nwm_gages[vv].dims))
else:
chunk_plan[vv] = ds_nwm_gages[vv].shape
ds_nwm_gages[vv].encoding['chunks'] = None # seems redundant, with ds.chunk() ?
# for vv in ds_nwm_gages.variables:
# print('\n')
# print(vv)
# print(ds_nwm_gages[vv].encoding)
ds_nwm_gages = ds_nwm_gages.chunk(chunks=dim_chunk_sizes)
chunk_plan
{'streamflow': (367439, 1),
'velocity': (367439, 1),
'elevation': (7994,),
'feature_id': (7994,),
'gage_id': (7994,),
'latitude': (7994,),
'longitude': (7994,),
'order': (7994,),
'time': (367439,)}
dir_scratch = pathlib.Path('/glade/scratch/jamesmcc')
file_chanobs = dir_scratch / 'chanobs.zarr'
file_chanobs_temp = dir_scratch / 'chanobs_temp.zarr'
for ff in [file_chanobs_temp]: # , file_chanobs_temp]:
if ff.exists():
shutil.rmtree(ff)
if not file_chanobs.exists():
max_mem = f"{format(chunk_mem_factor * cluster_mem_gb / n_workers, '.2f')}GB"
rechunk_obj = rechunk(
ds_nwm_gages,
chunk_plan,
max_mem,
str(file_chanobs),
temp_store=str(file_chanobs_temp),
executor="dask",)
with performance_report(filename="dask-report.html"):
result = rechunk_obj.execute(retries=10)
ds_chanobs = xr.open_zarr(file_chanobs)
ds_chanobs
<xarray.Dataset>
Dimensions: (feature_id: 7994, time: 367439)
Coordinates:
elevation (feature_id) float32 dask.array<chunksize=(7994,), meta=np.ndarray>
* feature_id (feature_id) int32 3109 3923 12932 ... 1170023539 1180000535
gage_id (feature_id) |S15 dask.array<chunksize=(7994,), meta=np.ndarray>
latitude (feature_id) float32 dask.array<chunksize=(7994,), meta=np.ndarray>
longitude (feature_id) float32 dask.array<chunksize=(7994,), meta=np.ndarray>
order (feature_id) int32 dask.array<chunksize=(7994,), meta=np.ndarray>
* time (time) datetime64[ns] 1979-02-01T01:00:00 ... 2020-12-31T23:0...
Data variables:
streamflow (time, feature_id) float64 dask.array<chunksize=(367439, 1), meta=np.ndarray>
velocity (time, feature_id) float64 dask.array<chunksize=(367439, 1), meta=np.ndarray>
Attributes:
TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2
code_version: v5.2.0-beta2
featureType: timeSeries
model_configuration: retrospective
proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...
|
array([ 3109, 3923, 12932, ..., 1170022657, 1170023539,
1180000535], dtype=int32)
|
|
|
|
array(['1979-02-01T01:00:00.000000000', '1979-02-01T02:00:00.000000000',
'1979-02-01T03:00:00.000000000', ..., '2020-12-31T21:00:00.000000000',
'2020-12-31T22:00:00.000000000', '2020-12-31T23:00:00.000000000'],
dtype='datetime64[ns]')
|
|
usgs_station_id = "13317000" ## Lower Salmon River is an unmanaged flow but you can pick your own.
ds_nwm_gage = (
ds_chanobs
.where(ds_chanobs.gage_id == f'{usgs_station_id.rjust(15, " ")}'.encode(), drop=True))
%time streamflow_nwm = ds_nwm_gage.streamflow.load()
CPU times: user 103 ms, sys: 9.12 ms, total: 112 ms Wall time: 3.24 s
streamflow_nwm
<xarray.DataArray 'streamflow' (time: 367439, feature_id: 1)>
array([[88.43999802],
[88.41999802],
[88.40999802],
...,
[83.52999813],
[83.50999813],
[83.49999813]])
Coordinates:
elevation (feature_id) float32 438.0
* feature_id (feature_id) int32 23559953
gage_id (feature_id) |S15 b' 13317000'
latitude (feature_id) float32 45.75
longitude (feature_id) float32 -116.3
order (feature_id) int32 7
* time (time) datetime64[ns] 1979-02-01T01:00:00 ... 2020-12-31T23:0...
Attributes:
grid_mapping: crs
long_name: River Flow
units: m3 s-1array([[88.43999802],
[88.41999802],
[88.40999802],
...,
[83.52999813],
[83.50999813],
[83.49999813]])array([438.], dtype=float32)
array([23559953], dtype=int32)
array([b' 13317000'], dtype='|S15')
array([45.75026], dtype=float32)
array([-116.323364], dtype=float32)
array([7], dtype=int32)
array(['1979-02-01T01:00:00.000000000', '1979-02-01T02:00:00.000000000',
'1979-02-01T03:00:00.000000000', ..., '2020-12-31T21:00:00.000000000',
'2020-12-31T22:00:00.000000000', '2020-12-31T23:00:00.000000000'],
dtype='datetime64[ns]')streamflow_nwm_df = streamflow_nwm.squeeze('feature_id').to_dataframe()
param_id = "00060" # streamflow in ft3/s
data = DailyValueIO(
start_date=pd.Timestamp(ds_nwm_chrtout.time[0].values).date(),
end_date=pd.Timestamp(ds_nwm_chrtout.time[-1].values).date(),
station=usgs_station_id,
parameter=param_id,)
# create lists of date-flow values
streamflow_usgs_d = {}
for series in data:
streamflow_usgs_d['streamflow_obs'] = [r[1] * cfs_2_cms for r in series.data]
streamflow_usgs_d['time'] = [pd.to_datetime(r[0]) for r in series.data]
streamflow_usgs_df = pd.DataFrame(streamflow_usgs_d).set_index('time')
combo_df = (
streamflow_nwm_df
.join(streamflow_usgs_df, how='outer')
.rename(columns={'streamflow': 'NWM v2.1', 'streamflow_obs': 'observed'}))
def plot_water_year(water_year: int):
wy_df = (
combo_df[(combo_df.index >= f'{water_year - 1}-10-01') &
(combo_df.index < f'{water_year}-10-01')])
title = (
f'Water year {water_year}, USGS station {usgs_station_id} : '
f'National Water Model v2.1 retrospective and USGS observed streamflows')
display(
wy_df.plot.scatter(
x='time', y=['NWM v2.1', 'observed'],
title=title)
.opts(
ylabel='cubic meters per second',
xlabel=''))
return None
plot_water_year(2018)
plot_water_year(2003)
plot_water_year(1996)